package com.xiaojie.rocket.listener;

import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.xiaojie.rocket.mapper.OrderMapper;
import com.xiaojie.rocket.pojo.Order;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Component;

import java.io.InputStream;

/**
 * @author xiaojie
 * @version 1.0
 * @description: 回调监听
 * @date 2021/11/14 23:59
 */
@Slf4j
@Component
@RocketMQTransactionListener
public class ProducerListener implements RocketMQLocalTransactionListener {

    @Autowired
    private OrderMapper orderMapper;

    /**
     * @description: 执行本地事务
     * @author xiaojie
     * @date 2021/11/15 0:05
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
        try {
            if (message == null) {
                return null;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            int insert = orderMapper.insert(order);
            if (insert>0){
                //事务执行成功
                return RocketMQLocalTransactionState.COMMIT;
            }else{
                return RocketMQLocalTransactionState.ROLLBACK;
            }
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /**
     * @description: 检查本地事务
     * @author xiaojie
     * @date 2021/11/15 0:06
     * @version 1.0
     */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
        try {
            if (message == null) {
                //如果为空，有可能是网络原因，不能删除数据，继续重试
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            String msg = new String((byte[]) message.getPayload());
            log.info("发送的消息是message>>>>>>>>>",msg);
            Order order = JSONObject.parseObject(msg, Order.class);
            QueryWrapper queryWrapper=new QueryWrapper(order.getOrderid());
            Order dbOrder = orderMapper.selectOne(queryWrapper);
            if (dbOrder == null) {
                return RocketMQLocalTransactionState.UNKNOWN;
            }
            return RocketMQLocalTransactionState.COMMIT;
        } catch (Exception e) {
            e.printStackTrace();
            return RocketMQLocalTransactionState.UNKNOWN;
        }
    }
}
